package com.yz.kronos.schedule.handle;

import com.yz.kronos.schedule.model.KronosConfig;
import io.fabric8.kubernetes.api.model.batch.Job;
import io.fabric8.kubernetes.api.model.batch.JobList;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.utils.Utils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Set;

/**
 * kubernetes job listener
 * @author shanchong
 * @date 2019-11-16
 **/
@Slf4j
@Service
public class ProcessListener {

    @Autowired
    private KronosConfig kronosConfig;
    @Autowired
    private List<ResourceEventHandler<Job>> resourceEventHandlerList;
    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @PostConstruct
    public void init() {
        Set<String> namespaces = kronosConfig.getNamespaces();
        for (String namespace : namespaces) {
            run(namespace);
        }
    }

    /**
     * 启动后开启一个守护进程触发
     */
    public void run(String namespace) {
        threadPoolTaskExecutor.execute(()->{
            String serviceApi = kronosConfig.getServiceApi();
            String groupName = kronosConfig.getGroupName();
            String apiVersion = kronosConfig.getApiVersion();
            Long resyncPeriodInMillis = kronosConfig.getResyncPeriodInMillis();
            CustomResourceDefinitionContext customResourceDefinitionContext = new CustomResourceDefinitionContext.Builder()
                    .withGroup(groupName)
                    .withPlural(Utils.getPluralFromKind(Job.class.getSimpleName()))
                    .withVersion(apiVersion)
                    .build();
            SharedIndexInformer<Job> sharedIndexInformer = new DefaultKubernetesClient(serviceApi)
                    .inNamespace(namespace).informers()
                    .sharedIndexInformerForCustomResource(customResourceDefinitionContext,Job.class, JobList.class,
                            resyncPeriodInMillis);
            sharedIndexInformer.run();
            for (ResourceEventHandler<Job> reh : resourceEventHandlerList) {
                sharedIndexInformer.addEventHandler(reh);
            }
        });
    }

}
